package org.apache.spark.deploy.worker

import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.master.MasterMessages.{BoundPortsRequest, BoundPortsResponse, MasterResponse}
import org.apache.spark.deploy.worker.WorkerMessages.RegisterWorker
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkConf}

/**
  * Created by Administrator on 2016/8/19 0019.
  */
class Worker(override val rpcEnv: RpcEnv,
             address: RpcAddress,
             val securityMgr: SecurityManager,
             val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging{

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case BoundPortsRequest =>
      context.reply(BoundPortsResponse(address.port, 23, Some(23)))
  }
}

object Worker extends Logging{
  val hostname = Utils.localHostName()
  val port = 7078
  val SYSTEM_NAME = "sparkWorker"
  val ENDPOINT_NAME = "Worker"

  val masters = Array[String](s"spark://$hostname:7077")

  def main(args: Array[String]): Unit = {
    Utils.initDaemon(log)
    val conf = new SparkConf
//    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(hostname, port, masters, conf = conf)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
                              host: String,
                              port: Int,
                              masterUrls: Array[String],
                              workerNumber: Option[Int] = None,
                              conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
//    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, masterAddresses, ENDPOINT_NAME, conf, securityMgr))
    val masterEndpointRefs = masterAddresses.map{
      rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)
    }
    masterEndpointRefs.foreach(_.ask[MasterResponse](RegisterWorker(hostname, port)))
    rpcEnv
  }
}
